/* -------------------------------------------------------------------------
 *
 * gistvacuum.cpp
 *	  vacuuming routines for the openGauss GiST index access method.
 *
 *
 * Portions Copyright (c) 2020 Huawei Technologies Co.,Ltd.
 * Portions Copyright (c) 1996-2012, PostgreSQL Global Development Group
 * Portions Copyright (c) 1994, Regents of the University of California
 *
 * IDENTIFICATION
 *	  src/gausskernel/storage/access/gist/gistvacuum.cpp
 *
 * -------------------------------------------------------------------------
 */
#include "postgres.h"
#include "knl/knl_variable.h"
#include "access/genam.h"
#include "access/gist_private.h"
#include "commands/vacuum.h"
#include "miscadmin.h"
#include "storage/indexfsm.h"
#include "storage/lmgr.h"

/*
 * VACUUM cleanup: update FSM
 */
Datum gistvacuumcleanup(PG_FUNCTION_ARGS)
{
    IndexVacuumInfo *info = (IndexVacuumInfo *)PG_GETARG_POINTER(0);
    IndexBulkDeleteResult *stats = (IndexBulkDeleteResult *)PG_GETARG_POINTER(1);
    Relation rel = info->index;
    BlockNumber npages, blkno;
    BlockNumber totFreePages;
    bool needLock = false;

    /* No-op in ANALYZE ONLY mode */
    if (info->analyze_only)
        PG_RETURN_POINTER(stats);

    /* Set up all-zero stats if gistbulkdelete wasn't called */
    if (stats == NULL) {
        stats = (IndexBulkDeleteResult *)palloc0(sizeof(IndexBulkDeleteResult));
        /* use heap's tuple count */
        stats->num_index_tuples = info->num_heap_tuples;
        stats->estimated_count = info->estimated_count;

        /*
         * XXX the above is wrong if index is partial.	Would it be OK to just
         * return NULL, or is there work we must do below?
         */
    }

    /*
     * Need lock unless it's local to this backend.
     */
    needLock = !RELATION_IS_LOCAL(rel);
    /* try to find deleted pages */
    if (needLock)
        LockRelationForExtension(rel, ExclusiveLock);
    npages = RelationGetNumberOfBlocks(rel);
    if (needLock)
        UnlockRelationForExtension(rel, ExclusiveLock);

    totFreePages = 0;
    for (blkno = GIST_ROOT_BLKNO + 1; blkno < npages; blkno++) {
        Buffer buffer;
        Page page;

        vacuum_delay_point();

        buffer = ReadBufferExtended(rel, MAIN_FORKNUM, blkno, RBM_NORMAL, info->strategy);
        LockBuffer(buffer, GIST_SHARE);
        page = (Page)BufferGetPage(buffer);
        if (PageIsNew(page) || GistPageIsDeleted(page)) {
            totFreePages++;
            RecordFreeIndexPage(rel, blkno);
        }
        UnlockReleaseBuffer(buffer);
    }

    /* Finally, vacuum the FSM */
    IndexFreeSpaceMapVacuum(info->index);

    /* return statistics */
    stats->pages_free = totFreePages;
    if (needLock)
        LockRelationForExtension(rel, ExclusiveLock);
    stats->num_pages = RelationGetNumberOfBlocks(rel);
    if (needLock)
        UnlockRelationForExtension(rel, ExclusiveLock);

    PG_RETURN_POINTER(stats);
}

typedef struct GistBDItem {
    GistNSN parentlsn;
    BlockNumber blkno;
    struct GistBDItem *next;
} GistBDItem;

static void pushStackIfSplited(Page page, GistBDItem *stack)
{
    GISTPageOpaque opaque = GistPageGetOpaque(page);
    if (stack->blkno != GIST_ROOT_BLKNO && !XLogRecPtrIsInvalid(stack->parentlsn) &&
        (GistFollowRight(page) || XLByteLT(stack->parentlsn, opaque->nsn)) &&
        opaque->rightlink != InvalidBlockNumber /* sanity check */) {
        /* split page detected, install right link to the stack */
        GistBDItem *ptr = (GistBDItem *)palloc(sizeof(GistBDItem));

        ptr->blkno = opaque->rightlink;
        ptr->parentlsn = stack->parentlsn;
        ptr->next = stack->next;
        stack->next = ptr;
    }
}

/*
 * Bulk deletion of all index entries pointing to a set of heap tuples and
 * check invalid tuples left after upgrade.
 * The set of target tuples is specified via a callback routine that tells
 * whether any given heap tuple (identified by ItemPointer) is being deleted.
 *
 * Result: a palloc'd struct containing statistical info for VACUUM displays.
 */
Datum gistbulkdelete(PG_FUNCTION_ARGS)
{
    IndexVacuumInfo *info = (IndexVacuumInfo *)PG_GETARG_POINTER(0);
    IndexBulkDeleteResult *stats = (IndexBulkDeleteResult *)PG_GETARG_POINTER(1);
    IndexBulkDeleteCallback callback = (IndexBulkDeleteCallback)PG_GETARG_POINTER(2);
    void *callback_state = (void *)PG_GETARG_POINTER(3);
    Relation rel = info->index;
    GistBDItem *stack = NULL;
    GistBDItem *ptr = NULL;

    /* first time through? */
    if (stats == NULL)
        stats = (IndexBulkDeleteResult *)palloc0(sizeof(IndexBulkDeleteResult));
    /* we'll re-count the tuples each time */
    stats->estimated_count = false;
    stats->num_index_tuples = 0;

    stack = (GistBDItem *)palloc0(sizeof(GistBDItem));
    stack->blkno = GIST_ROOT_BLKNO;

    while (stack != NULL) {
        Buffer buffer;
        Page page;
        OffsetNumber i, maxoff;
        IndexTuple idxtuple;
        ItemId iid;

        buffer = ReadBufferExtended(rel, MAIN_FORKNUM, stack->blkno, RBM_NORMAL, info->strategy);
        LockBuffer(buffer, GIST_SHARE);
        gistcheckpage(rel, buffer);
        page = (Page)BufferGetPage(buffer);
        if (GistPageIsLeaf(page)) {
            OffsetNumber todelete[MaxOffsetNumber];
            int ntodelete = 0;

            LockBuffer(buffer, GIST_UNLOCK);
            LockBuffer(buffer, GIST_EXCLUSIVE);

            page = (Page)BufferGetPage(buffer);
            if (stack->blkno == GIST_ROOT_BLKNO && !GistPageIsLeaf(page)) {
                /* only the root can become non-leaf during relock */
                UnlockReleaseBuffer(buffer);
                /* one more check */
                continue;
            }

            /*
             * check for split proceeded after look at parent, we should check
             * it after relock
             */
            pushStackIfSplited(page, stack);

            /* Remove deletable tuples from page */
            maxoff = PageGetMaxOffsetNumber(page);

            for (i = FirstOffsetNumber; i <= maxoff; i = OffsetNumberNext(i)) {
                iid = PageGetItemId(page, i);
                idxtuple = (IndexTuple)PageGetItem(page, iid);
                if (callback(&(idxtuple->t_tid), callback_state, InvalidOid, InvalidBktId)) {
                    todelete[ntodelete] = i - ntodelete;
                    ntodelete++;
                    stats->tuples_removed += 1;
                } else
                    stats->num_index_tuples += 1;
            }

            if (ntodelete) {
                START_CRIT_SECTION();

                MarkBufferDirty(buffer);

                for (i = 0; i < ntodelete; i++)
                    PageIndexTupleDelete(page, todelete[i]);
                GistMarkTuplesDeleted(page);

                if (RelationNeedsWAL(rel)) {
                    XLogRecPtr recptr;

                    recptr = gistXLogUpdate(buffer, todelete, ntodelete, NULL, 0, InvalidBuffer);
                    PageSetLSN(page, recptr);
                } else
                    PageSetLSN(page, GetXLogRecPtrForTemp());

                END_CRIT_SECTION();
            }
        } else {
            /* check for split proceeded after look at parent */
            pushStackIfSplited(page, stack);

            maxoff = PageGetMaxOffsetNumber(page);

            for (i = FirstOffsetNumber; i <= maxoff; i = OffsetNumberNext(i)) {
                iid = PageGetItemId(page, i);
                idxtuple = (IndexTuple)PageGetItem(page, iid);

                ptr = (GistBDItem *)palloc(sizeof(GistBDItem));
                ptr->blkno = ItemPointerGetBlockNumber(&(idxtuple->t_tid));
                ptr->parentlsn = PageGetLSN(page);
                ptr->next = stack->next;
                stack->next = ptr;

                if (GistTupleIsInvalid(idxtuple))
                    ereport(LOG,
                            (errmsg("index \"%s\" contains an inner tuple marked as invalid",
                                    RelationGetRelationName(rel)),
                             errdetail("This is caused by an incomplete page split at crash recovery before upgrading "
                                       "to openGauss 1.0.0"),
                             errhint("Please REINDEX it.")));
            }
        }

        UnlockReleaseBuffer(buffer);

        ptr = stack->next;
        pfree(stack);
        stack = ptr;

        vacuum_delay_point();
    }

    PG_RETURN_POINTER(stats);
}
